package com.atguigu.flink.sql.helloworld;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * Created by Smexy on 2023/11/20

    批处理: ExecutionEnvironment
    流处理: StreamExecutionEnvironment
    表处理: TableEnvironment
                1) Table是直接读取外部数据，产生的，直接创建TableEnvironment
                2) Table由一个Stream转换得到的，需要StreamExecutionEnvironment
                        此时TableEnvironment 由 StreamExecutionEnvironment 构造
 */
public class Demo1_TableAPI
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //基于流的环境创建表的环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //把流转换为Table
        Table table = tableEnv.fromDataStream(ds);

        //将流转换为表，如果流中的类型是POJO，此时pojo的属性名自动作为表的列名
        table.printSchema();
        /*
            操作Table对象执行查询  select * from 表 where id = s1

            Expression表达式:   select 后面写的内容
                举例： select 列, 函数(列), 常量
         */
        Table t1 = table
            .where($("id").isEqual("s1"))
            .select($("id"), $("ts"), $("vc"));


        //提交执行运算，打印结果
        t1.execute().print();

        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
